/** @var TransactionProfiler */
protected $trxProfiler;
+ /** @var int */
+ protected $nonNativeInsertSelectBatchSize = 10000;
+
/**
* Constructor and database handle and attempt to connect to the DB server
*
$this->queryLogger = $params['queryLogger'];
$this->errorLogger = $params['errorLogger'];
+ if ( isset( $params['nonNativeInsertSelectBatchSize'] ) ) {
+ $this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'];
+ }
+
// Set initial dummy domain until open() sets the final DB/prefix
$this->currentDomain = DatabaseDomain::newUnspecified();
* - cliMode: Whether to consider the execution context that of a CLI script.
* - agent: Optional name used to identify the end-user in query profiling/logging.
* - srvCache: Optional BagOStuff instance to an APC-style cache.
+ * - nonNativeInsertSelectBatchSize: Optional batch size for non-native INSERT SELECT emulation.
* @return Database|null If the database driver or extension cannot be found
* @throws InvalidArgumentException If the database driver or extension cannot be found
* @since 1.18
return false;
}
- $rows = [];
- foreach ( $res as $row ) {
- $rows[] = (array)$row;
+ try {
+ $affectedRowCount = 0;
+ $this->startAtomic( $fname );
+ $rows = [];
+ $ok = true;
+ foreach ( $res as $row ) {
+ $rows[] = (array)$row;
+
+ // Avoid inserts that are too huge
+ if ( count( $rows ) >= $this->nonNativeInsertSelectBatchSize ) {
+ $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+ if ( !$ok ) {
+ break;
+ }
+ $affectedRowCount += $this->affectedRows();
+ $rows = [];
+ }
+ }
+ if ( $rows && $ok ) {
+ $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+ if ( $ok ) {
+ $affectedRowCount += $this->affectedRows();
+ }
+ }
+ if ( $ok ) {
+ $this->endAtomic( $fname );
+ $this->affectedRowCount = $affectedRowCount;
+ } else {
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->affectedRowCount = 0;
+ }
+ return $ok;
+ } catch ( Exception $e ) {
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->affectedRowCount = 0;
+ throw $e;
}
-
- return $this->insert( $destTable, $rows, $fname, $insertOptions );
}
/**
isset( $sql['selectOptions'] ) ? $sql['selectOptions'] : [],
isset( $sql['selectJoinConds'] ) ? $sql['selectJoinConds'] : []
);
- $this->assertLastSqlDb( implode( '; ', [ $sqlSelect, $sqlInsert ] ), $dbWeb );
+ $this->assertLastSqlDb( implode( '; ', [ $sqlSelect, 'BEGIN', $sqlInsert, 'COMMIT' ] ), $dbWeb );
}
public static function provideInsertSelect() {
];
}
+ public function testInsertSelectBatching() {
+ $dbWeb = new DatabaseTestHelper( __CLASS__, [ 'cliMode' => false ] );
+ $rows = [];
+ for ( $i = 0; $i <= 25000; $i++ ) {
+ $rows[] = [ 'field' => $i ];
+ }
+ $dbWeb->forceNextResult( $rows );
+ $dbWeb->insertSelect(
+ 'insert_table',
+ 'select_table',
+ [ 'field' => 'field2' ],
+ '*',
+ __METHOD__
+ );
+ $this->assertLastSqlDb( implode( '; ', [
+ 'SELECT field2 AS field FROM select_table WHERE * FOR UPDATE',
+ 'BEGIN',
+ "INSERT INTO insert_table (field) VALUES ('" . implode( "'),('", range( 0, 9999 ) ) . "')",
+ "INSERT INTO insert_table (field) VALUES ('" . implode( "'),('", range( 10000, 19999 ) ) . "')",
+ "INSERT INTO insert_table (field) VALUES ('" . implode( "'),('", range( 20000, 25000 ) ) . "')",
+ 'COMMIT'
+ ] ), $dbWeb );
+ }
+
/**
* @dataProvider provideReplace
* @covers Wikimedia\Rdbms\Database::replace